package io.feige.rpc.producer;

import io.feige.rpc.exception.NoServiceFoundException;
import io.feige.rpc.producer.config.ProducerConfig;
import io.feige.rpc.producer.network.ProducerConnectionService;
import io.feige.rpc.producer.network.implement.ProducerConnectionServiceImpl;
import io.feige.rpc.protocol.Protocol;

import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
 

public class RpcProducerService {
  
	private static Logger logger=LoggerFactory.getLogger(RpcProducerService.class);
	  
	private ProducerConnectionService producerConnectionService;
	
	private ExecutorService threadPool;
	
	private ProducerConfig config;
	
	private List<Object> services;
	
	public RpcProducerService(){
		logger.info("init config feige.properties");
		InputStream inStream = RpcProducerService.class.getClassLoader().getResourceAsStream("feige.properties");
		try {
			Properties pro = new Properties();
			pro.load(inStream); 
			config=new ProducerConfig();
			if (pro.getProperty("feige.host")!=null) {
				config.setHost(pro.getProperty("feige.host"));
			}
			
			if (pro.getProperty("feige.protocol")!=null) {
				try {
					config.setProtocol((Protocol)Class.forName(pro.getProperty("feige.protocol").trim()).newInstance());
				} catch (Exception e) {
					throw new IllegalArgumentException(e);
				}
			}
			
			
			config.setPort(Integer.parseInt(pro.getProperty("feige.port").trim()));
			config.setPoolSize(Integer.parseInt(pro.getProperty("feige.poolsize").trim()));
		} catch (Exception e) {
			logger.error("feige.properties not found in classpath");
			throw new RuntimeException(e); 
		}
	}
	
	public RpcProducerService(ProducerConfig config){ 
		this.config=config; 
	}
	
	public void start(){
		if (services!=null) {
			for (Object service:services) {
				for (Class<?> inter : service.getClass().getInterfaces()) {
					logger.info("export service {}", inter.getName());
					exportService(inter, service);
				}
			}
		}
		threadPool = Executors.newFixedThreadPool(config.getPoolSize(), threadFactory);
		producerConnectionService=new ProducerConnectionServiceImpl();
		producerConnectionService.start(this, config);
	}
	
	public void stop(){
		producerConnectionService.stop();
		threadPool.shutdownNow();
	}
  
	public ProducerConfig getConfig() {
		return config;
	}
	
	public void exportService(Class<?> clazz, Object service){
		ExportedServiceFactory.addService(clazz, service);
	}
  
	public Object invoke(String serviceName, String methodName, Class<?>[] types, Object[] args) throws Exception {
		logger.debug("service invoke, serviceName:{}, methodName:{}, args:{}", serviceName, methodName, args);
		Object service=ExportedServiceFactory.getService(serviceName);
		if (service==null) {
			throw new NoServiceFoundException();
		}else{
			if (args==null) {
				Method method= Class.forName(serviceName).getDeclaredMethod(methodName);
				return method.invoke(service);	
			}else{
				Method method= Class.forName(serviceName).getDeclaredMethod(methodName, types);
				return method.invoke(service, args);	
			} 
		} 
	}
	
	public void execute(Runnable runnable) {
		threadPool.execute(runnable);
	}
 
	public Protocol getProtocol() {
		return config.getProtocol();
	}

	public List<Object> getServices() {
		return services;
	}

	public void setServices(List<Object> services) {
		this.services = services;
	}

	//线程工厂
	private ThreadFactory threadFactory = new ThreadFactory() {
		private final AtomicInteger integer = new AtomicInteger();
 
		public Thread newThread(Runnable r) {
			return new Thread(r, "feige-rpc-thread:" + integer.getAndIncrement());
		}
	
	};
  
 
}
